[[annotation-error-handling]]
= Handling Exceptions

This section describes how to handle various exceptions that may arise when you use Spring for Apache Kafka.

[[listener-error-handlers]]
== Listener Error Handlers

Starting with version 2.0, the `@KafkaListener` annotation has a new attribute: `errorHandler`.

You can use the `errorHandler` to provide the bean name of a `KafkaListenerErrorHandler` implementation.
This functional interface has one method, as the following listing shows:

[source, java]
----
@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}
----

You have access to the spring-messaging `Message<?>` object produced by the message converter and the exception that was thrown by the listener, which is wrapped in a `ListenerExecutionFailedException`.
The error handler can throw the original or a new exception, which is thrown to the container.
Anything returned by the error handler is ignored.

Starting with version 2.7, you can set the `rawRecordHeader` property on the `MessagingMessageConverter` and `BatchMessagingMessageConverter` which causes the raw `ConsumerRecord` to be added to the converted `Message<?>` in the `KafkaHeaders.RAW_DATA` header.
This is useful, for example, if you wish to use a `DeadLetterPublishingRecoverer` in a listener error handler.
It might be used in a request/reply scenario where you wish to send a failure result to the sender, after some number of retries, after capturing the failed record in a dead letter topic.

[source, java]
----
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}
----

It has a sub-interface (`ConsumerAwareListenerErrorHandler`) that has access to the consumer object, through the following method:

[source, java]
----
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
----

Another sub-interface (`ManualAckListenerErrorHandler`) provides access to the `Acknowledgment` object when using manual `AckMode`+++s+++.

[source, java]
----
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
----

In either case, you should NOT perform any seeks on the consumer because the container would be unaware of them.

[[error-handlers]]
== Container Error Handlers

Starting with version 2.8, the legacy `ErrorHandler` and `BatchErrorHandler` interfaces have been superseded by a new `CommonErrorHandler`.
These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener.
`CommonErrorHandler` implementations to replace most legacy framework error handler implementations are provided.

See xref:kafka/annotation-error-handling.adoc#migrating-legacy-eh[Migrating Custom Legacy Error Handler Implementations to `CommonErrorHandler`] for information to migrate custom error handlers to `CommonErrorHandler`.

When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction.
Error handling for transactional containers are handled by the xref:kafka/annotation-error-handling.adoc#after-rollback[`AfterRollbackProcessor`].
If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back.

This interface has a default method `isAckAfterHandle()` which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception; it returns true by default.

Typically, the error handlers provided by the framework will throw an exception when the error is not "handled" (e.g. after performing a seek operation).
By default, such exceptions are logged by the container at `ERROR` level.
All of the framework error handlers extend `KafkaExceptionLogLevelAware` which allows you to control the level at which these exceptions are logged.

[source, java]
----
/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}
----

You can specify a global error handler to be used for all listeners in the container factory.
The following example shows how to do so:

[source, java]
----
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}
----

By default, if an annotated listener method throws an exception, it is thrown to the container, and the message is handled according to the container configuration.

The container commits any pending offset commits before calling the error handler.

If you are using Spring Boot, you simply need to add the error handler as a `@Bean` and Boot will add it to the auto-configured factory.

[[backoff-handlers]]
== Back Off Handlers

Error handlers such as the xref:kafka/annotation-error-handling.adoc#default-eh[DefaultErrorHandler] use a `BackOff` to determine how long to wait before retrying a delivery.
Starting with version 2.9, you can configure a custom `BackOffHandler`.
The default handler simply suspends the thread until the back off time passes (or the container is stopped).
The framework also provides the `ContainerPausingBackOffHandler` which pauses the listener container until the back off time passes and then resumes the container.
This is useful when the delays are longer than the `max.poll.interval.ms` consumer property.
Note that the resolution of the actual back off time will be affected by the `pollTimeout` container property.

[[default-eh]]
== DefaultErrorHandler

This new error handler replaces the `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler`, which have been the default error handlers for several releases now.
One difference is that the fallback behavior for batch listeners (when an exception other than a `BatchListenerFailedException` is thrown) is the equivalent of the xref:kafka/annotation-error-handling.adoc#retrying-batch-eh[Retrying Complete Batches].

IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed below, but without actually seeking.
Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused `poll()`, to keep the consumer alive; if xref:retrytopic.adoc[Non-Blocking Retries] or a `ContainerPausingBackOffHandler` are being used, the pause may extend over multiple polls).
The error handler returns a result to the container that indicates whether the current failing record can be resubmitted, or if it was recovered and then it will not be sent to the listener again.
To enable this mode, set the property `seekAfterError` to `false`.

The error handler can recover (skip) a record that keeps failing.
By default, after ten failures, the failed record is logged (at the `ERROR` level).
You can configure the handler with a custom recoverer (`BiConsumer`) and a `BackOff` that controls the delivery attempts and delays between each.
Using a `FixedBackOff` with `FixedBackOff.UNLIMITED_ATTEMPTS` causes (effectively) infinite retries.
The following example configures recovery after three tries:

[source, java]
----
DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));
----

To configure the listener container with a customized instance of this handler, add it to the container factory.

For example, with the `@KafkaListener` container factory, you can add `DefaultErrorHandler` as follows:

[source, java]
----
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}
----

For a record listener, this will retry a delivery up to 2 times (3 delivery attempts) with a back off of 1 second, instead of the default configuration (`FixedBackOff(0L, 9)`).
Failures are simply logged after retries are exhausted.

As an example, if the `poll` returns six records (two from each partition 0, 1, 2) and the listener throws an exception on the fourth record, the container acknowledges the first three messages by committing their offsets.
The `DefaultErrorHandler` seeks to offset 1 for partition 1 and offset 0 for partition 2.
The next `poll()` returns the three unprocessed records.

If the `AckMode` was `BATCH`, the container commits the offsets for the first two partitions before calling the error handler.

For a batch listener, the listener must throw a `BatchListenerFailedException` indicating which records in the batch failed.

The sequence of events is:

* Commit the offsets of the records before the index.
* If retries are not exhausted, perform seeks so that all the remaining records (including the failed record) will be redelivered.
* If retries are exhausted, attempt recovery of the failed record (default log only) and perform seeks so that the remaining records (excluding the failed record) will be redelivered.
The recovered record's offset is committed.
* If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted.

IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed above, but without actually seeking.
Instead, error handler creates a new `ConsumerRecords<?, ?>` containing just the unprocessed records which will then be submitted to the listener (after performing a single paused `poll()`, to keep the consumer alive).
To enable this mode, set the property `seekAfterError` to `false`.


The default recoverer logs the failed record after retries are exhausted.
You can use a custom recoverer, or one provided by the framework such as the xref:kafka/annotation-error-handling.adoc#dead-letters[`DeadLetterPublishingRecoverer`].

When using a POJO batch listener (e.g. `List<Thing>`), and you don't have the full consumer record to add to the exception, you can just add the index of the record that failed:

[source, java]
----
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}
----

When the container is configured with `AckMode.MANUAL_IMMEDIATE`, the error handler can be configured to commit the offset of recovered records; set the `commitRecovered` property to `true`.

See also xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records].

When using transactions, similar functionality is provided by the `DefaultAfterRollbackProcessor`.
See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor].

The `DefaultErrorHandler` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure.
The exceptions that are considered fatal, by default, are:

* `DeserializationException`
* `MessageConversionException`
* `ConversionException`
* `MethodArgumentResolutionException`
* `NoSuchMethodException`
* `ClassCastException`

since these exceptions are unlikely to be resolved on a retried delivery.

You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions.
See the Javadocs for `DefaultErrorHandler.addNotRetryableException()` and `DefaultErrorHandler.setClassifications()` for more information, as well as those for the `spring-retry` `BinaryExceptionClassifier`.

Here is an example that adds `IllegalArgumentException` to the not-retryable exceptions:

[source, java]
----
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}
----

The error handler can be configured with one or more `RetryListener`+++s+++, receiving notifications of retry and recovery progress.
Starting with version 2.8.10, methods for batch listeners were added.

[source, java]
----
@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}
----

See the JavaDocs for more information.

IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
If the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
To skip retries after a recovery failure, set the error handler's `resetStateOnRecoveryFailure` to `false`.

You can provide the error handler with a `BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception:

[source, java]
----
handler.setBackOffFunction((record, ex) -> { ... });
----

If the function returns `null`, the handler's default `BackOff` will be used.

Set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures.
When `false` (the default before version 2.9), the exception type is not considered.

Starting with version 2.9, this is now `true` by default.

Also see xref:kafka/annotation-error-handling.adoc#delivery-header[Delivery Attempts Header].

[[batch-listener-conv-errors]]
== Conversion Errors with Batch Error Handlers

Starting with version 2.8, batch listeners can now properly handle conversion errors, when using a `MessageConverter` with a `ByteArrayDeserializer`, a `BytesDeserializer` or a `StringDeserializer`, as well as a `DefaultErrorHandler`.
When a conversion error occurs, the payload is set to null and a deserialization exception is added to the record headers, similar to the `ErrorHandlingDeserializer`.
A list of `ConversionException`+++s+++ is available in the listener so the listener can throw a `BatchListenerFailedException` indicating the first index at which a conversion exception occurred.

Example:

[source, java]
----
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}
----

[[retrying-batch-eh]]
== Retrying Complete Batches

This is now the fallback behavior of the `DefaultErrorHandler` for a batch listener where the listener throws an exception other than a `BatchListenerFailedException`.

There is no guarantee that, when a batch is redelivered, the batch has the same number of records and/or the redelivered records are in the same order.
It is impossible, therefore, to easily maintain retry state for a batch.
The `FallbackBatchErrorHandler` takes the following approach.
If a batch listener throws an exception that is not a `BatchListenerFailedException`, the retries are performed from the in-memory batch of records.
In order to avoid a rebalance during an extended retry sequence, the error handler pauses the consumer, polls it before sleeping for the back off, for each retry, and calls the listener again.
If/when retries are exhausted, the `ConsumerRecordRecoverer` is called for each record in the batch.
If the recoverer throws an exception, or the thread is interrupted during its sleep, the batch of records will be redelivered on the next poll.
Before exiting, regardless of the outcome, the consumer is resumed.

IMPORTANT: This mechanism cannot be used with transactions.

While waiting for a `BackOff` interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the `stop()` rather than causing a delay.

[[container-stopping-error-handlers]]
== Container Stopping Error Handlers

The `CommonContainerStoppingErrorHandler` stops the container if the listener throws an exception.
For record listeners, when the `AckMode` is `RECORD`, offsets for already processed records are committed.
For record listeners, when the `AckMode` is any manual value, offsets for already acknowledged records are committed.
For record listeners, when the `AckMode` is `BATCH`, or for batch listeners, the entire batch is replayed when the container is restarted.

After the container stops, an exception that wraps the `ListenerExecutionFailedException` is thrown.
This is to cause the transaction to roll back (if transactions are enabled).

[[cond-eh]]
== Delegating Error Handler

The `CommonDelegatingErrorHandler` can delegate to different error handlers, depending on the exception type.
For example, you may wish to invoke a `DefaultErrorHandler` for most exceptions, or a `CommonContainerStoppingErrorHandler` for others.

All delegates must share the same compatible properties (`ackAfterHandle`, `seekAfterError` ...).

[[log-eh]]
== Logging Error Handler

The `CommonLoggingErrorHandler` simply logs the exception; with a record listener, the remaining records from the previous poll are passed to the listener.
For a batch listener, all the records in the batch are logged.

[[mixed-eh]]
== Using Different Common Error Handlers for Record and Batch Listeners

If you wish to use a different error handling strategy for record and batch listeners, the `CommonMixedErrorHandler` is provided allowing the configuration of a specific error handler for each listener type.

[[eh-summary]]
== Common Error Handler Summary

* `DefaultErrorHandler`
* `CommonContainerStoppingErrorHandler`
* `CommonDelegatingErrorHandler`
* `CommonLoggingErrorHandler`
* `CommonMixedErrorHandler`

[[legacy-eh]]
== Legacy Error Handlers and Their Replacements

[cols="16,16" options="header"]
|===
|Legacy Error Handler
|Replacement

|`LoggingErrorHandler`
|`CommonLoggingErrorHandler`

|`BatchLoggingErrorHandler`
|`CommonLoggingErrorHandler`

|`ConditionalDelegatingErrorHandler`
|`DelegatingErrorHandler`

|`ConditionalDelegatingBatchErrorHandler`
|`DelegatingErrorHandler`

|`ContainerStoppingErrorHandler`
|`CommonContainerStoppingErrorHandler`

|`ContainerStoppingBatchErrorHandler`
|`CommonContainerStoppingErrorHandler`

|`SeekToCurrentErrorHandler`
|`DefaultErrorHandler`

|`SeekToCurrentBatchErrorHandler`
|No replacement, use `DefaultErrorHandler` with an infinite `BackOff`.

|`RecoveringBatchErrorHandler`
|`DefaultErrorHandler`

|`RetryingBatchErrorHandler`
|No replacements, use `DefaultErrorHandler` and throw an exception other than `BatchListenerFailedException`.
|===

[[migrating-legacy-eh]]
=== Migrating Custom Legacy Error Handler Implementations to `CommonErrorHandler`

Refer to the JavaDocs in `CommonErrorHandler`.

To replace an `ErrorHandler` or `ConsumerAwareErrorHandler` implementation, you should implement `handleOne()` and leave `seeksAfterHandle()` to return `false` (default).
You should also implement `handleOtherException()` to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).

To replace a `RemainingRecordsErrorHandler` implementation, you should implement `handleRemaining()`  and override `seeksAfterHandle()` to return `true` (the error handler must perform the necessary seeks).
You should also implement `handleOtherException()` - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).

To replace any `BatchErrorHandler` implementation, you should implement `handleBatch()`
You should also implement `handleOtherException()` - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).

[[after-rollback]]
== After Rollback Processor

When using transactions, if the listener throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back.
By default, any unprocessed records (including the failed record) are re-fetched on the next poll.
This is achieved by performing `seek` operations in the `DefaultAfterRollbackProcessor`.
With a batch listener, the entire batch of records is reprocessed (the container has no knowledge of which record in the batch failed).
To modify this behavior, you can configure the listener container with a custom `AfterRollbackProcessor`.
For example, with a record-based listener, you might want to keep track of the failed record and give up after some number of attempts, perhaps by publishing it to a dead-letter topic.

Starting with version 2.2, the `DefaultAfterRollbackProcessor` can now recover (skip) a record that keeps failing.
By default, after ten failures, the failed record is logged (at the `ERROR` level).
You can configure the processor with a custom recoverer (`BiConsumer`) and maximum failures.
Setting the `maxFailures` property to a negative number causes infinite retries.
The following example configures recovery after three tries:

[source, java]
----
AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));
----

When you do not use transactions, you can achieve similar functionality by configuring a `DefaultErrorHandler`.
See xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers].

Starting with version 3.2, Recovery can now recover (skip) entire batch of records that keeps failing.
Set `ContainerProperties.setBatchRecoverAfterRollback(true)` to enable this feature.

IMPORTANT: Default behavior, recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing.
In such cases, the application listener must handle a record that keeps failing.

See also xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records].

Starting with version 2.2.5, the `DefaultAfterRollbackProcessor` can be invoked in a new transaction (started after the failed transaction rolls back).
Then, if you are using the `DeadLetterPublishingRecoverer` to publish a failed record, the processor will send the recovered record's offset in the original topic/partition to the transaction.
To enable this feature, set the `commitRecovered` and `kafkaTemplate` properties on the `DefaultAfterRollbackProcessor`.

IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the processor's `resetStateOnRecoveryFailure` property to `false`.

Starting with version 2.6, you can now provide the processor with a `BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception:

[source, java]
----
handler.setBackOffFunction((record, ex) -> { ... });
----

If the function returns `null`, the processor's default `BackOff` will be used.

Starting with version 2.6.3, set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures.
By default, the exception type is not considered.

Starting with version 2.3.1, similar to the `DefaultErrorHandler`, the `DefaultAfterRollbackProcessor` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure.
The exceptions that are considered fatal, by default, are:

* `DeserializationException`
* `MessageConversionException`
* `ConversionException`
* `MethodArgumentResolutionException`
* `NoSuchMethodException`
* `ClassCastException`

since these exceptions are unlikely to be resolved on a retried delivery.

You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions.
See the Javadocs for `DefaultAfterRollbackProcessor.setClassifications()` for more information, as well as those for the `spring-retry` `BinaryExceptionClassifier`.

Here is an example that adds `IllegalArgumentException` to the not-retryable exceptions:

[source, java]
----
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}
----

Also see xref:kafka/annotation-error-handling.adoc#delivery-header[Delivery Attempts Header].

IMPORTANT: With current `kafka-clients`, the container cannot detect whether a `ProducerFencedException` is caused by a rebalance or if the producer's `transactional.id` has been revoked due to a timeout or expiry.
Because, in most cases, it is caused by a rebalance, the container does not call the `AfterRollbackProcessor` (because it's not appropriate to seek the partitions because we no longer are assigned them).
If you ensure the timeout is large enough to process each transaction and periodically perform an "empty" transaction (e.g. via a `ListenerContainerIdleEvent`) you can avoid fencing due to timeout and expiry.
Or, you can set the `stopContainerWhenFenced` container property to `true` and the container will stop, avoiding the loss of records.
You can consume a `ConsumerStoppedEvent` and check the `Reason` property for `FENCED` to detect this condition.
Since the event also has a reference to the container, you can restart the container using this event.

Starting with version 2.7, while waiting for a `BackOff` interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the `stop()` rather than causing a delay.

Starting with version 2.7, the processor can be configured with one or more `RetryListener`+++s+++, receiving notifications of retry and recovery progress.

[source, java]
----
@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}
----

See the JavaDocs for more information.

[[delivery-header]]
== Delivery Attempts Header

The following applies to record listeners only, not batch listeners.

Starting with version 2.5, when using an `ErrorHandler` or `AfterRollbackProcessor` that implements `DeliveryAttemptAware`, it is possible to enable the addition of the `KafkaHeaders.DELIVERY_ATTEMPT` header (`kafka_deliveryAttempt`) to the record.
The value of this header is an incrementing integer starting at 1.
When receiving a raw `ConsumerRecord<?, ?>` the integer is in a `byte[4]`.

[source, java]
----
int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();
----

When using `@KafkaListener` with the `DefaultKafkaHeaderMapper` or `SimpleKafkaHeaderMapper`, it can be obtained by adding `@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery` as a parameter to the listener method.

To enable population of this header, set the container property `deliveryAttemptHeader` to `true`.
It is disabled by default to avoid the (small) overhead of looking up the state for each record and adding the header.

The `DefaultErrorHandler` and `DefaultAfterRollbackProcessor` support this feature.

[[li-header]]
== Listener Info Header

In some cases, it is useful to be able to know which container a listener is running in.

Starting with version 2.8.4, you can now set the `listenerInfo` property on the listener container, or set the `info` attribute on the `@KafkaListener` annotation.
Then, the container will add this in the `KafkaListener.LISTENER_INFO` header to all incoming messages; it can then be used in record interceptors, filters, etc., or in the listener itself.

[source, java]
----
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}
----

When used in a `RecordInterceptor` or `RecordFilterStrategy` implementation, the header is in the consumer record as a byte array, converted using the `KafkaListenerAnnotationBeanPostProcessor`+++'+++s `charSet` property.

The header mappers also convert to `String` when creating `MessageHeaders` from the consumer record and never map this header on an outbound record.

For POJO batch listeners, starting with version 2.8.6, the header is copied into each member of the batch and is also available as a single `String` parameter after conversion.

[source, java]
----
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
----

NOTE: If the batch listener has a filter and the filter results in an empty batch, you will need to add `required = false` to the `@Header` parameter because the info is not available for an empty batch.

If you receive `List<Message<Thing>>` the info is in the `KafkaHeaders.LISTENER_INFO` header of each `Message<?>`.

See xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners] for more information about consuming batches.

[[dead-letters]]
== Publishing Dead-letter Records

You can configure the `DefaultErrorHandler` and `DefaultAfterRollbackProcessor` with a record recoverer when the maximum number of failures is reached for a record.
The framework provides the `DeadLetterPublishingRecoverer`, which publishes the failed message to another topic.
The recoverer requires a `KafkaTemplate<Object, Object>`, which is used to send the record.
You can also, optionally, configure it with a `BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>`, which is called to resolve the destination topic and partition.

IMPORTANT: By default, the dead-letter record is sent to a topic named `<originalTopic>-dlt` (the original topic name suffixed with `-dlt`) and to the same partition as the original record.
Therefore, when you use the default resolver, the dead-letter topic **must have at least as many partitions as the original topic.**

If the returned `TopicPartition` has a negative partition, the partition is not set in the `ProducerRecord`, so the partition is selected by Kafka.
Starting with version 2.2.4, any `ListenerExecutionFailedException` (thrown, for example, when an exception is detected in a `@KafkaListener` method) is enhanced with the `groupId` property.
This allows the destination resolver to use this, in addition to the information in the `ConsumerRecord` to select the dead letter topic.

The following example shows how to wire a custom destination resolver:

[source, java]
----
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
----

The record sent to the dead-letter topic is enhanced with the following headers:

* `KafkaHeaders.DLT_EXCEPTION_FQCN`: The Exception class name (generally a `ListenerExecutionFailedException`, but can be others).
* `KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN`: The Exception cause class name, if present (since version 2.8).
* `KafkaHeaders.DLT_EXCEPTION_STACKTRACE`: The Exception stack trace.
* `KafkaHeaders.DLT_EXCEPTION_MESSAGE`: The  Exception message.
* `KafkaHeaders.DLT_KEY_EXCEPTION_FQCN`: The Exception class name (key deserialization errors only).
* `KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE`: The Exception stack trace (key deserialization errors only).
* `KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE`: The  Exception message (key deserialization errors only).
* `KafkaHeaders.DLT_ORIGINAL_TOPIC`: The original topic.
* `KafkaHeaders.DLT_ORIGINAL_PARTITION`: The original partition.
* `KafkaHeaders.DLT_ORIGINAL_OFFSET`: The original offset.
* `KafkaHeaders.DLT_ORIGINAL_TIMESTAMP`: The original timestamp.
* `KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE`: The original timestamp type.
* `KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP`: The original consumer group that failed to process the record (since version 2.8).

Key exceptions are only caused by `DeserializationException`+++s+++ so there is no `DLT_KEY_EXCEPTION_CAUSE_FQCN`.

There are two mechanisms to add more headers.

1. Subclass the recoverer and override `createProducerRecord()` - call `super.createProducerRecord()` and add more headers.
2. Provide a `BiFunction` to receive the consumer record and exception, returning a `Headers` object; headers from there will be copied to the final producer record; also see xref:kafka/annotation-error-handling.adoc#dlpr-headers[Managing Dead Letter Record Headers].
Use `setHeadersFunction()` to set the `BiFunction`.

The second is simpler to implement but the first has more information available, including the already assembled standard headers.

Starting with version 2.3, when used in conjunction with an `ErrorHandlingDeserializer`, the publisher will restore the record `value()`, in the dead-letter producer record, to the original value that failed to be deserialized.
Previously, the `value()` was null and user code had to decode the `DeserializationException` from the message headers.
In addition, you can provide multiple `KafkaTemplate`+++s+++ to the publisher; this might be needed, for example, if you want to publish the `byte[]` from a `DeserializationException`, as well as values using a different serializer from records that were deserialized successfully.
Here is an example of configuring the publisher with `KafkaTemplate`+++s+++ that use a `String` and `byte[]` serializer:

[source, java]
----
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}
----

The publisher uses the map keys to locate a template that is suitable for the `value()` about to be published.
A `LinkedHashMap` is recommended so that the keys are examined in order.

When publishing `null` values, and there are multiple templates, the recoverer will look for a template for the `Void` class; if none is present, the first template from the `values().iterator()` will be used.

Since 2.7 you can use the `setFailIfSendResultIsError` method so that an exception is thrown when message publishing fails.
You can also set a timeout for the verification of the sender success with `setWaitForSendResultTimeout`.

IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` property to `false`.

Starting with version 2.6.3, set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures.
By default, the exception type is not considered.

Starting with version 2.3, the recoverer can also be used with Kafka Streams - see xref:streams.adoc#streams-deser-recovery[Recovery from Deserialization Exceptions] for more information.

The `ErrorHandlingDeserializer` adds the deserialization exception(s) in headers `ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER` and `ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER` (using Java serialization).
By default, these headers are not retained in the message published to the dead letter topic.
Starting with version 2.7, if both the key and value fail deserialization, the original values of both are populated in the record sent to the DLT.

If incoming records are dependent on each other, but may arrive out of order, it may be useful to republish a failed record to the tail of the original topic (for some number of times), instead of sending it directly to the dead letter topic.
See https://stackoverflow.com/questions/64646996[this Stack Overflow Question] for an example.

The following error handler configuration will do exactly that:

[source, java]
----
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic.DLT", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}
----

Starting with version 2.7, the recoverer checks that the partition selected by the destination resolver actually exists.
If the partition is not present, the partition in the `ProducerRecord` is set to `null`, allowing the `KafkaProducer` to select the partition.
You can disable this check by setting the `verifyPartition` property to `false`.

Starting with version 3.1, setting the `logRecoveryRecord` property to `true` will log the recovery record and exception.

[[dlpr-headers]]
== Managing Dead Letter Record Headers

Referring to xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records] above, the `DeadLetterPublishingRecoverer` has two properties used to manage headers when those headers already exist (such as when reprocessing a dead letter record that failed, including when using xref:retrytopic.adoc[Non-Blocking Retries]).

* `appendOriginalHeaders` (default `true`)
* `stripPreviousExceptionHeaders` (default `true` since version 2.8)

Apache Kafka supports multiple headers with the same name; to obtain the "latest" value, you can use `headers.lastHeader(headerName)`; to get an iterator over multiple headers, use `headers.headers(headerName).iterator()`.

When repeatedly republishing a failed record, these headers can grow (and eventually cause publication to fail due to a `RecordTooLargeException`); this is especially true for the exception headers and particularly for the stack trace headers.

The reason for the two properties is because, while you might want to retain only the last exception information, you might want to retain the history of which topic(s) the record passed through for each failure.

`appendOriginalHeaders` is applied to all headers named `*ORIGINAL*` while `stripPreviousExceptionHeaders` is applied to all headers named `*EXCEPTION*`.

Starting with version 2.8.4, you now can control which of the standard headers will be added to the output record.
See the `enum HeadersToAdd` for the generic names of the (currently) 10 standard headers that are added by default (these are not the actual header names, just an abstraction; the actual header names are set up by the `getHeaderNames()` method which subclasses can override.

To exclude headers, use the `excludeHeaders()` method; for example, to suppress adding the exception stack trace in a header, use:

[source, java]
----
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
----

In addition, you can completely customize the addition of exception headers by adding an `ExceptionHeadersCreator`; this also disables all standard exception headers.

[source, java]
----
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});
----

Also starting with version 2.8.4, you can now provide multiple headers functions, via the `addHeadersFunction` method.
This allows additional functions to apply, even if another function  has already been registered, for example, when using xref:retrytopic.adoc[Non-Blocking Retries].

Also see xref:retrytopic/features.adoc#retry-headers[Failure Header Management] with xref:retrytopic.adoc[Non-Blocking Retries].

[[exp-backoff]]
== `ExponentialBackOffWithMaxRetries` Implementation

Spring Framework provides a number of `BackOff` implementations.
By default, the `ExponentialBackOff` will retry indefinitely; to give up after some number of retry attempts requires calculating the `maxElapsedTime`.
Since version 2.7.3, Spring for Apache Kafka provides the `ExponentialBackOffWithMaxRetries` which is a subclass that receives the `maxRetries` property and automatically calculates the `maxElapsedTime`, which is a little more convenient.

[source, java]
----
@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}
----

This will retry after `1, 2, 4, 8, 10, 10` seconds, before calling the recoverer.

